Efficient Parallel Processing in Python with concurrent.futures

Introduction

  • ThreadPoolExecutor and ProcessPoolExecutor are classes from Python’s concurrent.futures library, which provides a high-level interface for concurrent programming.

  • ThreadPoolExecutor uses a pool of threads to execute tasks concurrently, making it suitable for I/O-bound operations where tasks spend time waiting for external resources. It allows for efficient multitasking by leveraging threads within the same process.

  • ProcessPoolExecutor employs a pool of separate processes to handle tasks, ideal for CPU-bound operations that require substantial computation. Each process runs independently with its own memory space, avoiding the Global Interpreter Lock (GIL) and enabling true parallelism.

Importing libraries

import pandas as pd
import concurrent.futures
import time

Importing data

The sample data provided includes weekly sales figures and associated marketing spend across various channels like branded and non-branded search, Facebook, print, and more. The dataset spans 5 years, capturing these metrics for a more extensive period.

Week sales branded_search_spend nonbranded_search_spend facebook_spend print_spend ooh_spend tv_spend radio_spend
0 7/23/17 58850.0 1528.8 463.32 802.620 0 0 0 0
1 7/30/17 62050.0 1575.6 468.00 819.312 0 0 0 0
2 8/6/17 59388.0 1544.4 477.36 749.034 0 0 0 0
3 8/13/17 56964.0 1528.8 468.00 741.468 0 0 0 0
4 8/20/17 53460.0 1560.0 458.64 811.200 0 0 0 0

Function to get summary statistics

The function uses the describe() method from pandas to generate summary statistics for the specified columns in the DataFrame. This method provides key statistics such as mean, standard deviation, min, and max values.

# Task function: Perform data analysis (summary statistics for each task)
def task(n):
    time.sleep(n)  # Simulating a time-consuming task
    summary_stats = df[['sales', 'branded_search_spend', 'nonbranded_search_spend', 'facebook_spend']].describe()
    summary_stats = summary_stats.round(2)
    return summary_stats

1. Function using normal method

# Number of tasks and task duration
num_tasks = 5
task_duration = 1  # 1 second for each task

# Measure sequential execution time
start_time = time.time()
for _ in range(num_tasks):
    result = task(task_duration)
sequential_time = time.time() - start_time

2. Function using ThreadpoolExecutor()

ThreadPoolExecutor is a class from the concurrent.futures module that allows you to manage a pool of threads for concurrent execution.

Code Explanation:

  1. ThreadPoolExecutor() creates a thread pool for concurrent execution.
  2. executor.submit(task, task_duration) schedules the task function to run with task_duration as its argument, repeating for num_tasks.
  3. concurrent.futures.wait(futures) waits for all scheduled tasks to complete.
# Measure execution time using ThreadPoolExecutor
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(task, task_duration) for _ in range(num_tasks)]
    concurrent.futures.wait(futures)
thread_pool_time = time.time() - start_time

Note: You can specify the maximum number of threads(max_workers=4) in the pool by passing the max_workers parameter to ThreadPoolExecutor().

3. Function using ProcessPoolExecutor()

ProcessPoolExecutor is also a class from the concurrent.futures module that manages a pool of processes for parallel execution.

# Measure execution time using ProcessPoolExecutor
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = [executor.submit(task, task_duration) for _ in range(num_tasks)]
    concurrent.futures.wait(futures)
process_pool_time = time.time() - start_time

Note: Just like ThreadPoolExecutor we can use (max_workers=4) in ProcessPoolExecutor().

Displaying Output

print("\nSummary statistics from one of the tasks:")
result

Summary statistics from one of the tasks:
sales branded_search_spend nonbranded_search_spend facebook_spend
count 260.00 260.00 260.00 260.00
mean 68107.55 2033.68 402.38 1015.26
std 19608.83 902.96 200.05 453.05
min 29088.00 1375.92 229.32 660.23
25% 54405.00 1556.10 238.68 772.04
50% 64442.07 1606.80 351.00 827.19
75% 83872.11 1928.16 478.38 997.54
max 113762.88 7800.00 1093.69 3900.00

Display the result timing for all 3 methods

# Display the results
print(f"Sequential execution time: {sequential_time:.2f} seconds")
print(f"ThreadPoolExecutor time: {thread_pool_time:.2f} seconds")
print(f"ProcessPoolExecutor time: {process_pool_time:.2f} seconds")
Methods Time Taken
Sequential execution time 5.06 seconds
ThreadPoolExecutor time 1.05 seconds
ProcessPoolExecutor time 0.27 seconds

Inference

ThreadPoolExecutor cut execution time from 5.06 to 1.05 seconds, and ProcessPoolExecutor reduced it further to 0.27 seconds, showing superior performance for CPU-bound tasks.

Conclusion

ThreadPoolExecutor is ideal for tasks that involve significant waiting, as it efficiently manages multiple threads within the same process. ProcessPoolExecutor is best for CPU-intensive tasks, providing true parallelism by using separate processes. Selecting the right executor enhances performance by aligning the concurrency model with the specific needs of your tasks.